package net.bwie.realtime.dws.log.function;

import net.bwie.realtime.dws.log.bean.PageViewBean;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class PageViewWindowFunction implements WindowFunction<PageViewBean,String,String, TimeWindow> {

    // 格式化实例
    private FastDateFormat  format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");


    @Override
    public void apply(String key, TimeWindow window,
                      Iterable<PageViewBean> input, Collector<String> out) throws Exception {

        //获取窗口开始和结束时间
        String windowStartTime = format.format(window.getStart());
        String windowEndTime = format.format(window.getEnd());

        //对窗口中数据计算
        Long pvCount = 0L;
        Long pvDuringTime = 0L;
        Long uvCount = 0L;
        Long sessionCount = 0L;
        for (PageViewBean pageViewBean : input) {
            pvCount += pageViewBean.getPvCount();
            pvDuringTime += pageViewBean.getPvDuringTime();
            uvCount += pageViewBean.getUvCount();
            sessionCount += pageViewBean.getSessionCount();
        }

        //输出
        String output = windowStartTime + "," + windowEndTime + "," + windowStartTime.substring(0,10)
                + "," + key
                + "," + uvCount + "," + sessionCount + "," + pvCount + "," + pvDuringTime;

        out.collect(output);


    }
}
